In [1]:
import pandas as pd
import numpy as np
import subprocess
import os
import plotly.express as px
import plotly.graph_objects as go
import math

Concept definition

  • file system -- the tested posix file system, including distributed ones, such as glusterfs, and single-host ones, such as xfs.
  • file -- the file written and read on file system.
  • record -- the entity written to file which has a fix length for one file.
  • worker -- a client of file system, technically is a process occupying a core of CPU.
  • combination -- combines arguments of length of record, number of record, files for per worker and number of workers.
  • epoch -- a turn of testing file system with an unique combination.

Aim to archieve

  • evaluate the reliability of file system.
  • find combinations of record length, record number, files, workers for the best whole effiency of host and the best process efficency.

Steps to use this fsXT

  • Before do the following steps, you should build (make) the source, the executive file fsXT will be generated and saved to fsXT/src/fsXT directory where this notebook file is in.
  • Step 1: Please adjust as your wish if the following default values of arguments are inapproporiate.
  • Step 2: Generate test scripts.
  • Step 3: Run test and wait until all epochs finished.
  • Step 4: Collect data from log files of epochs.
  • Step 5: Wrange data.
  • Step 6: Evaluate reliablity by comparing md5 digest with writing and md5 digest with reading for each file.
  • Step 7: Analyze correlationship based on epoch speed, file write speed and file read speed
  • Step 8: Score combinations on epoch speed 70%, file write speed 20%, file read speed 10%, and show top 10.
  • Step 9: Draw I/O graphs of top 10.

Step 1: Please adjust as your wish if the following default values of arguments are inapproporiate.

  • fsXT will invoke executive ttfs many times. Each time is an epoch which start worker processes according to LST_WORKERS.
  • Each worker gets record length, number of records per file, and caculate number of files by TOTAL_LOAD_PER_EPOCH / (WORKERS RECORDLEN RECORDNUM)
In [2]:
WRITE_PATH = "/mnt/ttfs"
READ_PATH = "/mnt1/ttfs"
TOTAL_LOAD_PER_EPOCH = 1024*1024*256         #256MB
LST_WORKERS = [20,24,28,32,36,40,44,48,52,56,60,64]
LST_RECORDLEN = [256,512,768,1024]
LST_RECORDNUM = [4096, 8192, 12288]

Not a step, following is core source code. Please don't modify the following codes unless you really know what you want to do.

In [3]:
class fsXT(object):
    def __init__(self):
        self.epochs = dict()   
        self.len_epochs = 0
        self.pwd = os.getcwd()
        self.testscriptfile = '%s/batch'%self.pwd

    def generate_data(self):
        es = set()
        for arg_p in LST_WORKERS:
            for arg_l in LST_RECORDLEN:
                for arg_n in LST_RECORDNUM:
                    arg_f = max(1, int(TOTAL_LOAD_PER_EPOCH / (arg_p*arg_l*arg_n)))
                    arg_n = max(1, int(TOTAL_LOAD_PER_EPOCH/(arg_p*arg_f*arg_l)))

                    cmdline = "%s/ttfs -w%s -r%s -p%d -l%d -n%d -f%d" % \
                            (self.pwd, WRITE_PATH, READ_PATH, arg_p, arg_l, arg_n, arg_f)
                    es.add(cmdline)

        sf = open(self.testscriptfile,'w')
        arg_e = 1
        for i in es:
            sf.write((i+' -e%d\n'%arg_e))
            arg_e += 1
        sf.close()
        self.len_epochs = len(es)

        print('The test script file was generated and save in %s'%self.testscriptfile)
            


    def run_test(self):
        cmdline = 'chmod +x %s;nohup %s &'%(self.testscriptfile, self.testscriptfile)
        print(cmdline)
        process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
        process.communicate()
        print('NOTICE: THIS IS A TIME CONSUMING JOB. YOU SHOULD WAIT UNTIL ALL EPOCHS FINISHED. PLEASE CHECK ITS STATUS ON HOST.')

        
    def collect_data(self):
        cmdline = \
"""
mkdir -p /tmp/fsXTlog;pushd /tmp/fsXTlog;rm files.log timeticks.log epochs.log -f;mv ../ttfslog.???? ./ -f;
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^1,"|sed `printf 's/^[^,]*,/%%d,/' ${i}` >> files.log;done
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^2,"|sed `printf 's/^[^,]*,/%%d,/' ${i}` >> timeticks.log;done
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^3,"|cut -f1 -d"," --complement >> epochs.log;done
"""%(self.len_epochs, self.len_epochs, self.len_epochs)
        process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
        process.communicate()
        print('Finished collecting data from /tmp/ttfslog.*, and save data to /tmp/fsXTlog/files.log, timeticks.log and epochs.log ')
        
        
    def wrangle_data(self):        
        self.df_epochs = pd.read_csv("/tmp/fsXTlog/epochs.log", header=None)
        self.df_epochs.columns = ['epoch', 'workers', 'record_length', 'record_number', 'files', 'start_timestamp', 'end_timestamp']
        self.df_files = pd.read_csv("/tmp/fsXTlog/files.log", header=None)
        self.df_files.columns = ['epoch', 'worker_id', 'filename','record_length', 'record_number', 'md5_write', 'md5_read', 
                                 'tlen_open','tlen_close','tlen_mv','tlen_write', 'tlen_read']
        self.df_timeticks = pd.read_csv("/tmp/fsXTlog/timeticks.log", header=None)
        self.df_timeticks.columns = ['epoch', 'worker_id', 'checkpoint','unit_wbytes', 'unit_rbytes', 'elapsed_time']
        self.df_timeticks['unit_wbytes'] = self.df_timeticks['unit_wbytes']/1024**2
        self.df_timeticks['unit_rbytes'] = self.df_timeticks['unit_rbytes']/1024**2
    
        self.df_epochs['total_files'] = self.df_epochs['files']*self.df_epochs['workers']
        self.df_epochs['total_records'] = self.df_epochs['total_files']*self.df_epochs['record_number']
        self.df_epochs['file_size'] = self.df_epochs['record_length']*self.df_epochs['record_number']
        self.df_epochs['duration'] = self.df_epochs['end_timestamp'] -  self.df_epochs['start_timestamp']
        self.df_epochs['total_load'] = self.df_epochs['total_files']*self.df_epochs['file_size']
        self.df_epochs['e_speed'] = self.df_epochs['total_load']/self.df_epochs['duration']/1024**2
        self.df_epochs.index = self.df_epochs['epoch']
        self.df_epochs.index.name = ""
        
        self.df_files['tlen_file'] = self.df_files['tlen_open']+self.df_files['tlen_close'] \
                +self.df_files['tlen_mv']+self.df_files['tlen_write']+self.df_files['tlen_read']
        
        self.df_files.drop(columns = ['record_length', 'record_number'], inplace=True)
        self.df_files = pd.merge(self.df_files, self.df_epochs, how='left', left_on=['epoch'], right_on = ['epoch'])
        self.df_files['fw_speed'] = self.df_files['file_size']/self.df_files['tlen_write']/1024**2
        self.df_files['fr_speed'] = self.df_files['file_size']/self.df_files['tlen_read']/1024**2
        
        df_score = xt.df_files[['e_speed','fw_speed','fr_speed']].agg(['min','max'])
        df_score = df_score.append(pd.Series(df_score.loc['max'] - df_score.loc['min'], name='range'))

        df_score_rst = self.df_files[['epoch','e_speed','fw_speed', 'fr_speed']].groupby(['epoch']).mean()
        df_score_rst['epoch_score'] = (df_score_rst['e_speed'] - df_score.loc['min','e_speed'])/df_score.loc['range','e_speed']*100.0
        df_score_rst['fw_score'] = (df_score_rst['fw_speed'] - df_score.loc['min','fw_speed'])/df_score.loc['range','fw_speed']*100.0
        df_score_rst['fr_score'] = (df_score_rst['fr_speed'] - df_score.loc['min','fr_speed'])/df_score.loc['range','fr_speed']*100.0
        df_score_rst['score'] = df_score_rst['epoch_score']*0.7+df_score_rst['fw_score']*0.2+df_score_rst['fr_score']*0.1
        self.df_epochs = pd.merge(self.df_epochs, df_score_rst[['fw_speed', 'fr_speed','epoch_score','fw_score','fr_score','score']], \
                 how='inner', left_index=True, right_index=True)
        self.df_epochs = self.df_epochs.sort_values('score', ascending=False).reset_index(drop=True)
        self.df_epochs['rank'] = self.df_epochs.index + 1
        print('Finished wrangling data.')
        

    def eval_reliability(self):
        df_nonreliability = self.df_files.query('md5_write!=md5_read')
        if len(df_nonreliability) > 0:
            print('!!!!!!! Fail to pass Reliablity Test !!!!!!!')
            print('------The following files have different digests of writing and reading:-----')
            print(self.df_files)
        else:
            print('### Success to pass Reliablity Test ###')
            
    def analyze_correlationship(self):
        tmpdf = self.df_epochs[['workers', 'record_length', 'record_number', 'files', \
                            'total_files', 'total_records', 'file_size', 'e_speed']]
        self.__analyze_correlationship(tmpdf, 'e_speed', 'EPOCHS PERFORMACE')
        
        tmpdf = self.df_files[['workers', 'record_length', 'record_number', 'total_records',\
                            'file_size', 'files','total_files', 'fw_speed', 'fr_speed']]
        self.__analyze_correlationship(tmpdf, 'fw_speed', 'WORKER PERFORMANCE')
        self.__analyze_correlationship(tmpdf, 'fr_speed', 'WORKER PERFORMANCE')
        

    def __analyze_correlationship(self, df, basic_col, df_label):
        sr_corr = df.corr()[basic_col]
        sr_corr.dropna(inplace=True)
        sr_corr.drop(labels=[basic_col], inplace=True)

        order = sr_corr.abs().sort_values(ascending = False)
        
        print('### Analyzing result of %s based on "%s" ###' % (df_label, basic_col)  )
        print(sr_corr[order.index])
        #print('--- the most factor influencing "%s" is "%s"'%(basic_col,order.index[0]))
            

    def histogram_wrspeed(self, li):
        i = 1
        for e in li:
            tmpdf = self.df_timeticks.query('epoch==%d'%e)

            fig = go.Figure()
            trace_write = go.Histogram(histfunc="sum", y=tmpdf['unit_wbytes'], x=tmpdf['elapsed_time'], 
                                       name="Epoch%d_writing"%e, xbins=go.histogram.XBins(start=0, size=0.1))

            trace_read = go.Histogram(histfunc="sum", y=tmpdf['unit_rbytes'], x=tmpdf['elapsed_time'], 
                                       name="Epoch%d_reading"%e, xbins=go.histogram.XBins(start=0, size=0.1))

            fig.add_trace(trace_write)
            fig.add_trace(trace_read)

            es = xt.df_epochs.loc[e]
            fig.update_layout(barmode="stack",bargap=0.1, 
                              xaxis_title='Elapsed time (seconds)',
                              yaxis_title='MBytes')
            print('Rank: %d\n'\
                'Epoch: %d, Spent time: %.2f seconds,\n'\
                'Record length: %d, Records: %d,\n'
                'Workers: %d, Total files: %d'%
                (i,e, es['duration'], es['record_length'],\
                es['record_number'], es['workers'], es['total_files']))

            i += 1
            fig.show()

Step 2: Generate test scripts.

In [4]:
xt = fsXT()
xt.generate_data()
The test script file was generated and save in /root/codesdir/fsXT/src/fsXT/batch

Step 3: Run test and wait until all epochs finished. NOTICE -- THIS IS A TIME CONSUMING JOB.

In [5]:
#xt.run_test()

Step 4: Collect data from log files of epochs.

In [6]:
xt.collect_data()
Finished collecting data from /tmp/ttfslog.*, and save data to /tmp/fsXTlog/files.log, timeticks.log and epochs.log 

Step 5: Wrange data.

In [7]:
xt.wrangle_data()
Finished wrangling data.

Step 6: Evaluate reliablity by comparing md5 digest with writing and md5 digest with reading for each file.

In [8]:
xt.eval_reliability()
### Success to pass Reliablity Test ###

Step 7: Analyze correlationship based on epoch speed, file write speed and file read speed

In [9]:
xt.analyze_correlationship()
### Analyzing result of EPOCHS PERFORMACE based on "e_speed" ###
record_length    0.895476
total_records   -0.800307
total_files     -0.487757
file_size        0.484239
files           -0.345987
record_number   -0.255739
workers         -0.223157
Name: e_speed, dtype: float64
### Analyzing result of WORKER PERFORMANCE based on "fw_speed" ###
record_length    0.710429
total_records   -0.635789
workers         -0.507266
file_size        0.479296
total_files     -0.415885
fr_speed        -0.180023
files           -0.118743
record_number   -0.093798
Name: fw_speed, dtype: float64
### Analyzing result of WORKER PERFORMANCE based on "fr_speed" ###
file_size       -0.492793
total_files      0.465953
record_length   -0.374628
files            0.361422
total_records    0.354417
record_number   -0.299410
fw_speed        -0.180023
workers          0.009179
Name: fr_speed, dtype: float64

Step 8: Score combinations on epoch speed 70%, file write speed 20%, file read speed 10%

Top 10

In [10]:
# index order are identical to rank order, so no need sort.
xt.df_epochs[:10]\
    [['rank', 'epoch','workers','record_length','record_number',\
      'duration','total_files','e_speed', 'fw_speed', 'fr_speed',\
       'epoch_score', 'fw_score', 'fr_score', 'score']]
Out[10]:
rank epoch workers record_length record_number duration total_files e_speed fw_speed fr_speed epoch_score fw_score fr_score score
0 1 38 32 1024 8192 15.599013 32 16.411295 0.561827 40.823830 100.000000 49.095928 15.530386 81.372224
1 2 51 28 1024 4681 16.220592 56 15.781926 0.667206 20.668783 95.822399 58.643508 6.838554 79.488236
2 3 46 32 1024 4096 21.477083 64 11.919682 0.532969 44.664500 70.185719 46.481380 17.186668 60.144946
3 4 56 56 1024 4681 20.630908 56 12.408188 0.236016 15.104839 73.428309 19.576775 4.439112 55.759083
4 5 18 36 1024 7281 25.213650 36 10.152146 0.298857 12.316745 58.453226 25.270282 3.236751 46.294990
5 6 80 20 1024 4369 28.315550 60 9.040831 0.514088 38.568995 51.076574 44.770669 14.557992 46.163534
6 7 63 48 1024 5461 25.882375 48 9.890297 0.217578 12.475834 56.715133 17.906229 3.305358 43.612375
7 8 86 28 1024 9362 27.863066 28 9.187510 0.356923 10.574126 52.050194 30.531201 2.485249 42.789901
8 9 11 64 1024 4096 26.724644 64 9.579173 0.156957 14.323673 54.649968 12.413877 4.102235 41.147976
9 10 83 24 1024 5461 30.928032 48 8.276775 0.372638 32.290110 46.004952 31.955020 11.850232 39.779494

Bottom 10

In [11]:
xt.df_epochs[-10:]\
    [['rank', 'epoch','workers','record_length','record_number',\
      'duration','total_files','e_speed', 'fw_speed', 'fr_speed',\
       'epoch_score', 'fw_score', 'fr_score', 'score']]
Out[11]:
rank epoch workers record_length record_number duration total_files e_speed fw_speed fr_speed epoch_score fw_score fr_score score
89 90 74 64 256 8192 140.611633 128 1.820618 0.029542 55.322677 3.150478e+00 0.869763 21.782990 4.557586
90 91 9 60 256 8738 159.007415 120 1.609963 0.027158 78.737787 1.752204e+00 0.653827 31.880719 4.545380
91 92 88 48 256 21845 123.726283 48 2.069052 0.043795 20.284712 4.799527e+00 2.161125 6.672924 4.459186
92 93 69 60 256 4369 161.798641 240 1.582189 0.027693 78.025996 1.567847e+00 0.702211 31.573761 4.395311
93 94 87 36 256 14563 134.616349 72 1.901628 0.055299 31.128151 3.688207e+00 3.203391 11.349140 4.357337
94 95 33 48 256 10922 143.281690 96 1.786581 0.037839 43.384005 2.924553e+00 1.621549 16.634457 4.034942
95 96 13 52 256 20164 143.955995 52 1.778240 0.034713 30.940504 2.869184e+00 1.338275 11.268217 3.402906
96 97 47 56 256 18724 140.973785 56 1.815885 0.032900 12.058381 3.119065e+00 1.173985 3.125332 2.730675
97 98 96 60 256 17476 151.460154 60 1.690188 0.028573 12.941068 2.284715e+00 0.782001 3.505989 2.106300
98 99 71 64 256 16384 190.194802 64 1.345988 0.021264 26.558430 2.947761e-15 0.119802 9.378455 0.961806

Step 9: Draw I/O graphs

Top 3 histogram with bins of 0.1 second

In [12]:
xt.histogram_wrspeed(xt.df_epochs.sort_values(['score'],ascending=False)[:3]['epoch'])
Rank: 1
Epoch: 38, Spent time: 54.70 seconds,
Record length: 512, Records: 5957,
Workers: 44, Total files: 88
Rank: 2
Epoch: 51, Spent time: 66.78 seconds,
Record length: 512, Records: 5041,
Workers: 52, Total files: 104
Rank: 3
Epoch: 46, Spent time: 60.74 seconds,
Record length: 512, Records: 21845,
Workers: 24, Total files: 24

Bottom 3 histogram with bins of 0.1 second

In [13]:
xt.histogram_wrspeed(xt.df_epochs.sort_values(['score'],ascending=False)[-3:]['epoch'])
Rank: 1
Epoch: 47, Spent time: 65.51 seconds,
Record length: 512, Records: 10922,
Workers: 24, Total files: 48
Rank: 2
Epoch: 96, Spent time: 140.97 seconds,
Record length: 256, Records: 18724,
Workers: 56, Total files: 56
Rank: 3
Epoch: 71, Spent time: 91.96 seconds,
Record length: 512, Records: 8192,
Workers: 64, Total files: 64

Top 3 and bottom 3 line plot for writing scaled on seconds.

In [14]:
xt.df_timeticks['bin_second'] = xt.df_timeticks['elapsed_time'].apply(math.ceil)
df_wrspeed=xt.df_timeticks[['epoch', 'bin_second', 'unit_wbytes', 'unit_rbytes']].groupby(['epoch', 'bin_second']).sum()
df_wrspeed['elapsed_seconds'] = df_wrspeed.index.get_level_values(1)
df_wrspeed['EPOCH'] = df_wrspeed.index.get_level_values(0)
df_wrspeed['rank'] = df_wrspeed.index.get_level_values(0)
kvs= {k:v for k, v in xt.df_epochs[['epoch', 'rank']].T.apply(lambda i: (i['epoch'],i['rank']))}
df_wrspeed['rank'] = df_wrspeed['rank'].map(kvs)
top3_bottom3 = list(xt.df_epochs[:3].index)\
        + list(xt.df_epochs[-3:].index)
fig = px.line(df_wrspeed.query('epoch in %s'% 
        list(xt.df_epochs['epoch'].loc[top3_bottom3])), 
        x='elapsed_seconds', y="unit_wbytes", color="rank",
        line_group="rank", hover_name="EPOCH")
fig.show()

Box plot of writing speed of files

In [15]:
fig = px.box(xt.df_files, x="epoch", y="fw_speed")
fig.show()

Box plot of reading speed of files

In [16]:
fig = px.box(xt.df_files, x="epoch", y="fr_speed")
fig.show()
In [17]:
#df=xt.df_files
#df=df.set_index([df.epoch,df.index])
#df.std(level=0).sort_values(['fw_speed','fr_speed'])
In [ ]:
 

Inference

Precondition: the same data load to writing

  • For writing efficenccy both epoch (or host) and worker (or process), the length of record longer the efficency better, and the total number of record less the efficency better.